package com.atguigu.app.dwd;

import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @className: DwdToolCouponGet
 * @author: LinCong
 * @description:    工具域优惠券领取事务事实表
 * @date: 2023/2/4 17:39
 * @version: 1.0
 */

//业务服务器（mysql）-> maxwell -> kafka -> flink(DwdToolCouponGet) -> kafka
public class DwdToolCouponGet {
    public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        ////        1.1、开启checkpoint
//        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//        //设置checkpoint的超时时间,如果 Checkpoint在 10分钟内尚未完成说明该次Checkpoint失败,则丢弃。(默认10分钟)
//        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(120000L);
//        //固定延迟重启   （最多重启次数，每次重启的时间间隔）
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
////        1.2、设置状态后端
//        env.setStateBackend(new HashMapStateBackend());
//        System.setProperty("HADOOP_USER_NAME", "kevin");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop3cluster/211126/ck");

        // TODO 3. 从 Kafka 读取业务数据，封装为 Flink SQL 表
        tableEnv.executeSql("create table `topic_db`( " +
                "`database` string, " +
                "`table` string, " +
                "`data` map<string, string>, " +
                "`type` string, " +
                "`ts` string " +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_get"));

        // TODO 4. 读取优惠券领用数据，封装为表
        Table resultTable = tableEnv.sqlQuery("select " +
                "data['id'], " +
                "data['coupon_id'], " +
                "data['user_id'], " +
                "date_format(data['get_time'],'yyyy-MM-dd') date_id, " +
                "data['get_time'], " +
                "ts " +
                "from topic_db " +
                "where `table` = 'coupon_use' " +
                "and `type` = 'insert' ");
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 5. 建立 Kafka-Connector dwd_tool_coupon_get 表
        tableEnv.executeSql("create table dwd_tool_coupon_get ( " +
                "id string, " +
                "coupon_id string, " +
                "user_id string, " +
                "date_id string, " +
                "get_time string, " +
                "ts string " +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_get"));

        // TODO 6. 将数据写入 Kafka-Connector 表
        tableEnv.executeSql("" +
                "insert into dwd_tool_coupon_get select * from result_table");
    }
}
